/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.util.FSUtils
import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}


sealed trait FileSystemHolder {
  def fs: FileSystem

  def hadoopConfiguration: Configuration

  def scanPath: Path
}

object FileSystemHolder {

  def apply(jobConfig: Config): FileSystemHolder = {
    val tpe = jobConfig.getString("source.type")
    tpe match {
      case "oss" => new OssHolder(jobConfig)
      case _ => throw new RuntimeException(s"unknown type $tpe")
    }
  }

  class OssHolder(jobConfig: Config) extends FileSystemHolder {
    @transient var configuration: Option[Configuration] = None
    @transient var fileSystem: Option[FileSystem] = None
    @transient var path: Option[Path] = None

    override def fs: FileSystem = {
      if (fileSystem.isEmpty) {
        fileSystem = Some(scanPath.getFileSystem(hadoopConfiguration))
      }
      fileSystem.get
    }

    override def hadoopConfiguration: Configuration = {
      if (configuration.isEmpty) {
        val conf = new Configuration()
        conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
        conf.set("fs.s3a.connection.ssl.enabled", "false")
        conf.set("fs.s3a.path.style.access", "true")
        conf.set("fs.s3a.endpoint", jobConfig.getString("source.endpoint"))
        conf.set("fs.s3a.access.key", jobConfig.getString("source.ak"))
        conf.set("fs.s3a.secret.key", jobConfig.getString("source.sk"))
        configuration = Some(conf)
      }
      configuration.get

    }

    override def scanPath: Path = {
      if (path.isEmpty) {
        val p = jobConfig.getString("source.path")
        path = Some(FSUtils.wrapS3Path(p))
      }
      path.get
    }
  }
}
